/**
 *  Copyright 2007-2008 University Of Southern California
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package edu.isi.pegasus.planner.classes;


import edu.isi.pegasus.common.credential.CredentialHandler;
import edu.isi.pegasus.planner.catalog.classes.Profiles;

import edu.isi.pegasus.planner.catalog.classes.Profiles.NAMESPACES;
import edu.isi.pegasus.planner.namespace.Namespace;
import edu.isi.pegasus.planner.namespace.Condor;
import edu.isi.pegasus.planner.namespace.Dagman;
import edu.isi.pegasus.planner.namespace.ENV;
import edu.isi.pegasus.planner.namespace.Globus;
import edu.isi.pegasus.planner.namespace.Hints;
import edu.isi.pegasus.planner.namespace.Pegasus;

import edu.isi.pegasus.common.logging.LogManager;
import edu.isi.pegasus.planner.common.PegasusProperties;

import edu.isi.pegasus.planner.partitioner.graph.GraphNodeContent;

import edu.isi.pegasus.planner.catalog.transformation.TransformationCatalogEntry;

import edu.isi.pegasus.common.util.Separator;

import edu.isi.pegasus.planner.catalog.site.classes.GridGateway;
import edu.isi.pegasus.planner.dax.Invoke;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.HashSet;


import java.io.Writer;
import java.io.StringWriter;
import java.io.IOException;
import java.util.Collection;

/**
 * The object of this class holds the information to generate a submit file about
 * one particular job making the Dag.
 *
 * @author Karan Vahi
 * @author Gaurang Mehta
 * @version $Revision$
 */

public class Job extends Data implements GraphNodeContent{

    /**
     * Denotes a job that does not fall into the other categories. It might
     * denote an error condition or a faulty logic in the planner.
     */
    public static final int UNASSIGNED_JOB  = 0;

    /**
     * Denotes a compute job. Generally these are the jobs that are specified in
     * the DAX.
     */
    public static final int COMPUTE_JOB     = 1;

    /**
     * Denotea a job that is used to stage in the input files for a compute job.
     */
    public static final int STAGE_IN_JOB    = 2;

    /**
     * Denotes a job that transfers the date generated by a compute job to the
     * output pool specified by the user.
     */
    public static final int STAGE_OUT_JOB   = 3;

    /**
     * Denotes a job that registers in the replica mechanism the materialized
     * files.
     */
    public static final int REPLICA_REG_JOB = 4;

    /**
     * Denotes a job that transfers the output of a compute node to the site
     * where the child compute node is to be generated.
     */
    public static final int INTER_POOL_JOB  = 5;

    /**
     * Denotes a job that creates directories at the remote pools.
     */
    public static final int CREATE_DIR_JOB  = 6;

    /**
     * Denotes a job that stages the worker package.
     */
    public static final int STAGE_IN_WORKER_PACKAGE_JOB = 7;

    /**
     * Denotes a job for which the executable has been staged as part of the
     * workflow.
     */
//    public static final int STAGED_COMPUTE_JOB = 7;


    /**
     * Denotes a cleanup job, that removes the files that from the remote
     * working directories of the remote sites.
     */
    public static final int CLEANUP_JOB = 8;

    /**
     * Denotes a chmod job that sets the xbit on the remote end.
     */
    public static final int CHMOD_JOB = 9;


    /**
     * Denotes a chmod job that sets the xbit on the remote end.
     */
    public static final int DAX_JOB = 10;


    /**
     * Denotes a chmod job that sets the xbit on the remote end.
     */
    public static final int DAG_JOB = 11;

    /**
     * Returns an appropriate grid gateway job type corresponding to a job type
     *
     * @param type   the job type
     *
     * @return corresponding GridGateway job type
     */
    private static GridGateway.JOB_TYPE jobType2GridGatewayJobType( int type ){
    
        //sanity check 
        if ( !( typeInRange(type) ) ){
            throw new IllegalArgumentException("Invalid Job type " + type);
        }
        
        GridGateway.JOB_TYPE jtype ;
        
        switch( type ){
            case Job.COMPUTE_JOB:
            case Job.DAG_JOB:
            case Job.DAX_JOB:
                jtype = GridGateway.JOB_TYPE.compute;
                break;
                
            case Job.STAGE_IN_JOB:
                jtype = GridGateway.JOB_TYPE.transfer;
                break;
            
            case Job.STAGE_OUT_JOB:
                jtype = GridGateway.JOB_TYPE.transfer;
                break;
            
            case Job.REPLICA_REG_JOB:
                jtype = GridGateway.JOB_TYPE.register;
                break;
           
            case Job.INTER_POOL_JOB:
                jtype = GridGateway.JOB_TYPE.transfer;
                break;
            
            case Job.CREATE_DIR_JOB:
            case Job.CHMOD_JOB:
                jtype = GridGateway.JOB_TYPE.auxillary;
                break;
            
            case Job.CLEANUP_JOB:
                jtype = GridGateway.JOB_TYPE.cleanup;
                break;
                
            /*
            case Job.SYMLINK_STAGE_IN_JOB:
                jtype = GridGateway.JOB_TYPE.transfer;
                break;
            */
                
            case Job.UNASSIGNED_JOB:
            default:
                jtype = GridGateway.JOB_TYPE.compute;
        }
        
        return jtype;
    }


    /**
     * The delimiter that has to be used to combine the name for the staged
     * executable.
     */
    private static String DELIMITER =
        PegasusProperties.getInstance().getStagingDelimiter();


    /**
     * The type of the job. Pegasus tags the jobs according to the function of
     * the job. The jobs are tagged according to the functionality they serve in
     * the Pegasus super node. The job class can be
     *
     * unassigned
     * compute job
     * stage-in
     * stage-out
     * replica registration
     * inter-pool transfer
     * create-dir job
     * staged-compute job
     */
    public int jobClass;

    /**
     * Identifies of which Pegasus Super Node is a job associated with.
     * Pegasus Supernode is identified by the jobName of the compute node in the
     * super node.
     */
    public String jobID;


    /**
     * The name of the job.
     */
    public String jobName;

    /**
     * The logical name of the transformation which is executed as a part of
     * this job. Note: The tc is looked up by namespace__logicalName_version.
     */
    public String logicalName;

    /**
     * The logical id of the job as referred to in the dax.
     */
    public String logicalId;

    /**
     * The namespace to which the transformation is bound.
     */
    public String namespace;

    /**
     * The version of the transformation.
     */
    public String version;

    /**
     * The name of the derivation in Chimera that generated the job.
     */
    public String dvName;

    /**
     * The namespace to which the derivation is bound.
     *
     */
    public String dvNamespace;

    /**
     * The version of the derivation.
     */
    public String dvVersion;


    /**
     * The globus Scheduler for the job.
     */
    public String globusScheduler;

    /**
     * The path of the executable on the machine at which the job is executed.
     */
    public String executable;

    /**
     * The universe in which the job has to be executed. Can be standard,
     * vanilla or globus.
     */
    public String condorUniverse;

    /**
     * File which contains stdin (keyboard input).
     */
    public String stdIn;

    /**
     * File which contains stdout.
     */
    public String stdOut;

    /**
     * File which contains standard error.
     */
    public String stdErr;

    /**
     * The arguements for the job. It is the contains the arguments for the
     * job. This string is put in the arguments in Condor Submit File.
     */
    public String strargs;



    /**
     * Contains the input files for the submit file. They are vector of
     * PegasusFile Objects which store the transiency information of each
     * logical file.
     *
     * @see org.griphyn.cPlanner.classes.PegasusFile
     */
    public Set inputFiles;

    /**
     * Contains the output files for the submit file. They are vector of
     * PegasusFile Objects which store the transiency information of each logical
     * file.
     *
     * @see org.griphyn.cPlanner.classes.PegasusFile
     */
    public Set outputFiles;

    /**
     * The pool on which this job has been decided to be executed by the
     * Interpool Engine.
     *
     */
    public String executionPool;

    //the namespace variables

    /**
     * The namespace object containing the globus rsl attributes which the user
     * specifies in the dax, or the pool file or the properties file.
     */
    public Globus globusRSL;

    /**
     * For Condor Namespace. This contains the extra Condor options which one
     * may want to specify. These are copied straightaway to the Submit file.
     */
    public Condor condorVariables;

    /**
     * To accomodate the environment variables which might needed to be set.
     */
    public ENV envVariables;

    /**
     * The DAGMAN namespace profile variable holding the dagman profiles.
     * It holds the prescript and the postscripts for the jobs.
     */
    public Dagman dagmanVariables;

    /**
     * To accomodate all the hints that maybe passed through the DAX.
     */
    public Hints hints;

    /**
     * The Pegasus namespace variable.
     */
    public Pegasus vdsNS;

    /**
     * Identifies the level of the job in the dax. The level is bottom up
     * from the final child node.
     */
    public int level;
    
    /**
     * The expected runtime for a job.
     */
    private double mRuntime;

    /**
     * Boolean indicating whether the job executables were staged for it or not.
     */
    private boolean mJobExecutablesStaged;

    
    /**
     * All the notifications associated with the job
     */
    private Notifications mNotifications;

    /**
     * The staging site associated with the job
     */
    private String mStagingSite;

    /**
     * The directory in which the job should run.
     */
    private String mDirectory;

    /**
     * The relative path to the submit directory for the job, from the workflows
     * base submit directory.
     */
//    private String submitDirectory;


    /**
     * Set of credential types required by a job.
     */
    private Set<CredentialHandler.TYPE> mCredentialsType;

    /**
     * Intialises the member variables.
     */
    public Job() {
        jobName          = "";
        namespace        = "";
        logicalName      = "";
        logicalId        = "";
        version          = "";
        dvName           = null;
        dvNamespace      = null;
        dvVersion        = null;
        jobID            = "";
        globusScheduler  = "";
        executable       = "";
        condorUniverse   = "";
        stdIn            = "";
        stdOut           = "";
        stdErr           = "";
        inputFiles       = new HashSet();
        outputFiles      = new HashSet();
        strargs          = "";
        envVariables     = new ENV();
        executionPool    = new String();
        globusRSL        = new Globus();
        condorVariables  = new Condor();
        dagmanVariables  = new Dagman();
        hints            = new Hints();
        vdsNS            = new Pegasus();
        jobClass         = UNASSIGNED_JOB;
        level            = -1;
        mRuntime = -1;
        mJobExecutablesStaged = false;
        mNotifications   = new Notifications();
        mStagingSite     = null;
        mDirectory       = null;
        mCredentialsType = new HashSet<CredentialHandler.TYPE>();
//        submitDirectory  = null;
    }

    /**
     * Overloaded constructor. Does a shallow copy of the job object passed.
     *
     * @param job the <code>Job</code> object containing the job description.
     */
    public Job(Job job){
        jobName          = job.jobName;
        namespace        = job.namespace;
        logicalName      = job.logicalName;
        logicalId        = job.logicalId;
        version          = job.version;
        dvName           = job.dvName;
        dvNamespace      = job.dvNamespace;
        dvVersion        = job.dvVersion;
        jobID            = job.jobID;
        globusScheduler  = job.globusScheduler;
        executable       = job.executable;
        condorUniverse   = job.condorUniverse;
        stdIn            = job.stdIn;
        stdOut           = job.stdOut;
        stdErr           = job.stdErr;
        inputFiles       = job.inputFiles;
        outputFiles      = job.outputFiles;
        strargs          = job.strargs;
        envVariables     = job.envVariables;
        executionPool    = job.executionPool;
        globusRSL        = job.globusRSL;
        condorVariables  = job.condorVariables;
        dagmanVariables  = job.dagmanVariables;
        hints            = job.hints;
        vdsNS            = job.vdsNS;
        jobClass         = job.getJobType();
        level            = job.level;
        mRuntime = job.mRuntime;
        mJobExecutablesStaged = job.mJobExecutablesStaged;
        mNotifications   = job.mNotifications;
        mStagingSite     = job.mStagingSite;
        mDirectory       = job.mDirectory;
        mCredentialsType = new HashSet<CredentialHandler.TYPE>();
//        submitDirectory  = job.submitDirectory;
    }

    /**
     * Returns a new copy of the Object.
     *
     * @return clone of the object.
     */
    public Object clone(){
        Job newSub = new Job();

        newSub.condorUniverse = this.condorUniverse;
        newSub.envVariables   = (ENV)this.envVariables.clone();
        newSub.executable     = this.executable;
        newSub.globusScheduler= this.globusScheduler;
        for(Iterator it = this.inputFiles.iterator(); it.hasNext(); ){
            newSub.addInputFile( (PegasusFile)((PegasusFile)it.next()).clone());
        }
        for(Iterator it = this.outputFiles.iterator(); it.hasNext(); ){
            newSub.addOutputFile( (PegasusFile)((PegasusFile)it.next()).clone());
        }

        newSub.jobName        = this.jobName;
        newSub.logicalName    = this.logicalName;
        newSub.logicalId      = this.logicalId;
        newSub.stdErr         = this.stdErr;
        newSub.stdIn          = this.stdIn;
        newSub.stdOut         = this.stdOut;
        newSub.strargs        = this.strargs;
        newSub.executionPool  = this.executionPool;
        newSub.globusRSL      = this.globusRSL == null ? null : (Globus)this.globusRSL.clone();

        newSub.condorVariables= this.condorVariables == null ? null :
                                                        (Condor)this.condorVariables.clone();
        newSub.dagmanVariables= this.dagmanVariables == null ? null :
                                                        (Dagman)this.dagmanVariables.clone();
        newSub.vdsNS          = this.vdsNS == null ? null :(Pegasus)this.vdsNS.clone();

        newSub.hints          = (Hints)this.hints.clone();
        newSub.jobID          = this.jobID;
        newSub.jobClass       = this.jobClass;

        newSub.dvName       = this.dvName;
        newSub.namespace    = this.namespace;
        newSub.version      = this.version;
        newSub.dvNamespace  = this.dvNamespace;
        newSub.dvVersion    = this.dvVersion;

        newSub.level        = this.level;
        newSub.mRuntime = this.mRuntime;
//        newSub.submitDirectory = this.submitDirectory == null ? null : new String(this.submitDirectory);

        
        newSub.mJobExecutablesStaged = this.mJobExecutablesStaged;
        newSub.mNotifications = (Notifications)this.getNotifications().clone();
        newSub.mStagingSite   = this.mStagingSite;

        newSub.mDirectory       = this.mDirectory;

        for( CredentialHandler.TYPE type : this.getCredentialTypes()  ){
            newSub.addCredentialType( type );
        }

        return newSub;
    }
    
    /**
     * Sets the expected runtime for the job.
     * 
     * @param runtime  the runtime for the job.
     */
    public void setRuntime( String  runtime ) {
        if( runtime == null ){
            mRuntime = -1;
        }
        else{
            mRuntime = Double.parseDouble( runtime );
        }
    }

    /**
     * Sets the expected runtime for the job.
     * 
     * @param runtime  the runtime for the job.
     */
    public void setRuntime( double  runtime ) {
        mRuntime = runtime;
    }

    /**
     * Returns the expected runtime for the job that is set using the
     * setRuntime method.
     * 
     * @return   the runtime for the job.
     */
    public double getRuntime( ) {
        return mRuntime;
    }
    
    /**
     * Returns the runtime associated with the job. If the runtime variable with
     * the job is set to -ve then, it also attempts to check on the value
     * specified in Pegasus Profile key runtime for the job. If there is 
     * a value associated with profile key, that the runtime value is set to that
     * using setRuntime( String ) function.
     * 
     * @return the expected runtime.
     * @see org.griphyn.cPlanner.namespace.Pegasus#RUNTIME_KEY
     */
    public double computeRuntime( ){
        if( mRuntime < 0 ){
            //attempt to look up the value from pegasus profile
            String value = this.vdsNS.getStringValue(  Pegasus.RUNTIME_KEY );
            if( value == null ){
                return -1;
            }
            else{
                setRuntime( value );
            }
        }
        return mRuntime;
    }
    
    /**
     * Set the universe associated with the job.
     *
     * @param universe   the universe to be associated.
     */
    public void setUniverse( String universe ){
        this.condorUniverse = universe;
    }

    /**
     * Returns the universe associated with the job.
     *
     * @return  the universe associate with job .
     */
    public String getUniverse(  ){
        return this.condorUniverse;
    }


    /**
     * Sets the executable staging flag in the job to the value passed.
     *
     * @param value  the boolean value.
     */
    public void setExecutableStagingForJob( boolean value ){
        
        mJobExecutablesStaged = value;
    }

    /**
     * Returns whether user executables need to be staged for job or not.
     * 
     * @return user executable staging.
     */
    public boolean userExecutablesStagedForJob(){

        return mJobExecutablesStaged ;
    }

    /**
     * Adds an input file to the underlying collection of input files
     * associated with the job.
     *
     * @param file the <code>PegasusFile</code> containing the input file.
     */
    public void addInputFile(PegasusFile file){
        this.inputFiles.add(file);
    }



    /**
     * Sets the input files associated with the job.
     *
     * @param ipFiles  Set of <code>PegasusFile</code> objects containing the input files.
     */
    public void setInputFiles( Set ipFiles ){
        this.inputFiles = ipFiles;
    }

    /**
     * Returns the set of input files associated with the job.
     *
     * @return Set of <code>PegasusFile</code> objects containing the input files.
     */
    public Set getInputFiles( ){
        return this.inputFiles;
    }

    
    /**
     * Resets the notifications associated with the job
     */
    public void resetNotifications( ){
        this.mNotifications = new Notifications();
    }

    /**
     * Adds a Invoke object correpsonding to a notification.
     * 
     * @param invoke  the invoke object containing the notification
     */
    public void addNotification( Invoke invoke ){
       this.mNotifications.add(invoke);
    }

    /**
     * Adds all the notifications specfied in the TransformationCatalogEntry
     * to the underlying job notifications.
     *
     * @param entry     the TransformationCatalogEntry object
     */
    public void addNotifications( TransformationCatalogEntry entry ){
        this.mNotifications.addAll( entry.getNotifications() );
    }
    
    /**
     * Adds all the notifications passed to the underlying container.
     * 
     * @param invokes  the notifications to be added
     */
    public void addNotifications( Notifications invokes  ){
        this.mNotifications.addAll(invokes);
    }

    /**
     * Returns a collection of all the notifications that need to be
     * done for a particular condition
     * 
     * @param when  the condition
     * 
     * @return
     */
    public Collection<Invoke> getNotifications( Invoke.WHEN when ){
       return this.mNotifications.getNotifications(when);
    }

    /**
     * Returns all the notifications associated with the job.
     * 
     * @return the notifications
     */
    public Notifications getNotifications(  ){
       return this.mNotifications;
    }


    /**
     * Looks at a URL to determine whether a credential should be associated with
     * a job or not.
     *
     * @param url   the url for which a credential needs to be added
     */
    public void addCredentialType( String url ){
        //sanity check
        if( url == null ){
            return;
        }

        if( url.startsWith( "gsiftp" ) ){
            this.addCredentialType( CredentialHandler.TYPE.x509 );
        }
        else if( url.startsWith( "s3" ) ){
            this.addCredentialType( CredentialHandler.TYPE.s3 );
        }
        else if( url.startsWith( "irods" ) ){
            this.addCredentialType( CredentialHandler.TYPE.irods );
        }
        else if( url.startsWith( "scp" ) ){
            this.addCredentialType( CredentialHandler.TYPE.ssh );
        }
    }

    /**
     * Adds a type of credential that will be required by a job.
     *
     * @param type  the credential type.
     */
    public void addCredentialType( CredentialHandler.TYPE type ){
       this.mCredentialsType.add( type );
    }


    /**
     * Returns the various credential types required by a job
     *
     * @return the set of credentials required.
     */
    public Set<CredentialHandler.TYPE> getCredentialTypes(  ){
       return this.mCredentialsType;
    }

    /**
     * Resets the credential types required by a job.
     *
     */
    public void resetCredentialTypes(){
        this.mCredentialsType.clear();
    }


    /**
     * Adds an output file to the underlying collection of output files
     * associated with the job.
     *
     * @param file the <code>PegasusFile</code> containing the output file.
     */
    public void addOutputFile(PegasusFile file){
        this.outputFiles.add(file);
    }


    /**
     * Sets the output files associated with the job.
     *
     * @param opFiles  Set of <code>PegasusFile</code> objects containing the
     *                 output files.
     */
    public void setOutputFiles( Set opFiles ){
        this.outputFiles = opFiles;
    }

    /**
     * Returns the set of output files associated with the job.
     *
     * @return Set of <code>PegasusFile</code> objects containing the output files.
     */
    public Set getOutputFiles( ){
        return this.outputFiles;
    }




    /**
     * Sets the type of the job.
     *
     * @param type  the type of the job.
     * @exception IllegalArgumentException if the job type is outside its legal
     * range.
     *
     * @see #UNASSIGNED_JOB
     * @see #COMPUTE_JOB
     * @see #STAGE_IN_JOB
     * @see #STAGE_OUT_JOB
     * @see #REPLICA_REG_JOB
     * @see #INTER_POOL_JOB
     * @see #CREATE_DIR_JOB
     * @see #STAGED_COMPUTE_JOB
     * @see #CLEANUP_JOB
     */
    public void setJobType(int type){
        if(typeInRange(type)){
            jobClass = type;
        }
        else{
            throw new IllegalArgumentException("Invalid Job type " + type);
        }
    }

    /**
     * Sets the site handle of the site, where teh job is to be executed
     *
     * @param site  the site handle.
     */
    public void setSiteHandle(String site){
        this.executionPool = site;
    }

    /**
     * Returns the handle of the site where the job is scheduled.
     *
     * @return site handle.
     */
    public String getSiteHandle(){
        return executionPool;
    }


    /**
     * Sets the path to the executable on the remote grid site. This executable
     * is invoked whenever a job is run on the remote grid site.
     *
     * @param path  the path to the underlying transformation on the remote grid
     *              site.
     *
     * @see #getSiteHandle()
     */
    public void setRemoteExecutable( String path ){
        this.executable = path;
    }

    /**
     * Returns the path of the underlying executable on the remote grid site.
     *
     * @return the path to the executable if set.
     */
    public String getRemoteExecutable( ){
        return this.executable;
    }


    /**
     * Sets the remote jobmanager on which the job has to run.
     *
     * @param jobmanager the jobmanager url.
     *
     * @see #getJobManager()
     */
    public void setJobManager( String jobmanager ){
        this.globusScheduler = jobmanager;
    }

    /**
     * Returnss the remote jobmanager on which the job has to run.
     *
     * @return the jobmanager url.
     *
     * @see #setJobManager(java.lang.String)
     */
    public String getJobManager( ){
        return this.globusScheduler;
    }



    /**
     * Sets the file to which the stdout of the job needs to be written to
     * at the remote grid site. Should be just the basename. The file appears
     * in the remote working directory for that job.
     *
     * @param fileName  the basename of the file.
     */
    public void setStdOut( String fileName ){
        this.stdOut = fileName;
    }

    /**
     * Returns the file to which the stdout of the job is written to.
     *
     * @return  the basename of the file.
     */
    public String getStdOut( ){
        return this.stdOut;
    }

    /**
     * Sets the file to which the stderr of the job needs to be written to
     * at the remote grid site. Should be just the basename. The file appears
     * in the remote working directory for that job.
     *
     * @param fileName  the basename of the file.
     */
    public void setStdErr( String fileName ){
        this.stdErr = fileName;
    }

    /**
     * Returns the file to which the stderr of the job is written to.
     *
     * @return  the basename of the file.
     */
    public String getStdErr( ){
        return this.stdErr;
    }


    /**
     * Sets the file to from which to pick up the stdin for the job. The file
     * is tracked via Replica Catalog, and is staged to the remote grid site.
     *
     * @param fileName  the basename of the file.
     */
    public void setStdIn( String fileName ){
        this.stdIn = fileName;
    }

    /**
     * Returns the file from which the stdin is picked up.
     *
     * @return  the basename of the file.
     */
    public String getStdIn( ){
        return this.stdIn;
    }


    /**
     * Returns the ID associated with the job. Unfortunately currently it is
     * the job name.
     *
     * @return the ID of the job.
     */
    public String getID(){
        return getName();
    }

    /**
     * Sets the staging site.
     *
     * @param site    the staging
     */
    public void setStagingSiteHandle( String site ){
        this.mStagingSite = site;
    }


    /**
     * Returns the staging site.
     *
     * @return    the staging
     */
    public String getStagingSiteHandle( ){
        return this.mStagingSite;
    }

    /**
     * Returns the name of the job.
     *
     * @return String
     */
    public String getName(){
        return jobName;
    }

    /**
     * Setter method to set the name of the job.
     *
     * @param name   the name of the job.
     */
    public void setName(String name){
        jobName = name;
    }

    /**
     * Returns the directory where the job runs.
     *
     * @return String
     */
    public String getDirectory(){
        return mDirectory ;
    }

    /**
     * Setter method to set the name of the job.
     *
     * @param name   the name of the job.
     */
    public void setDirectory( String directory ){
        mDirectory = directory;
    }


    /**
     * Returns the logical id of the job.
     *
     * @return String
     */
    public String getLogicalID(){
        return logicalId;
    }
    
    /**
     * Returns the DAX ID for the job if it appeared in the DAX, else null
     * 
     * @return  the id of the job in the DAX if present , else null
     */
    public String getDAXID(){
        StringBuffer sb = new StringBuffer();
        int type = this.getJobType();
        if( type == Job.COMPUTE_JOB ||
            type == Job.DAG_JOB || 
            type == Job.DAX_JOB ) {
        
            //dax and dag jobs actually are never launched
            //via kickstart as of now.

            //pass the logical id in the DAX
            sb.append( this.getLogicalID() );
            
        }
        else{
            //for all auxillary jobs pass null
            sb.append( (String)null );
        }
        return sb.toString();
    }

    /**
     * Setter method to set the logical id of the job.
     *
     * @param id  the logical id of the job.
     */
    public void setLogicalID(String id){
        logicalId = id;
    }

    /**
     * Returns the name of the compute job of Pegasus supernode containing this job.
     *
     * @return String
     */
    public String getVDSSuperNode(){
        return this.jobID;
    }

    /**
     * Setter method to the name of the compute job of Pegasus supernode containing
     * this job.
     *
     * @param name  the name of the job.
     */
    public void setVDSSuperNode( String name ){
        this.jobID = name;
    }



    /**
     * Returns the type of the job. Returns the value matching the jobClass.
     *
     * @return int value of job class.
     */
    public int getJobType(){
        return this.jobClass;
    }

    /**
     * Returns the corresponding grid gateway job type
     *
     * @return grid gateway job type
     */
    public GridGateway.JOB_TYPE getGridGatewayJobType(){
        
//        JIRA PM-277
//        return Job.jobType2GridGatewayJobType( this.getJobType() );
        
        return this.hints.containsKey( Hints.GRID_JOB_TYPE_KEY  ) ?
               GridGateway.JOB_TYPE.valueOf( (String)this.hints.get( Hints.GRID_JOB_TYPE_KEY  )  ):
               Job.jobType2GridGatewayJobType( this.getJobType() );
    }

    /**
     * Gets the textual description of the type associated with the job.
     *
     * @return the textual description of the type associated with the job.
     */
    public String getJobTypeDescription(){
        return getJobTypeDescription(this.jobClass);
    }


    /**
     * Gets the textual description of the type that can be associated
     * with a job.
     *
     * @param  type  the type of the job.
     *
     * @return the textual description of the type associated with the job.
     */
    public String getJobTypeDescription(int type){
        String desc = null;

        switch (type){
            case COMPUTE_JOB:
                desc = "compute";
                break;

            case STAGE_IN_JOB:
                desc = "stage-in-tx";
                break;

            case STAGE_OUT_JOB:
                desc = "stage-out-tx";
                break;

            case INTER_POOL_JOB:
                desc = "inter-site-tx";
                break;
                
            case REPLICA_REG_JOB:
                desc = "registration";
                break;

            case UNASSIGNED_JOB:
                desc = "unassigned";
                break;

            case CREATE_DIR_JOB:
                desc = "create-dir";
                break;
                
            case CLEANUP_JOB:
                desc = "cleanup";
                break;

            case CHMOD_JOB:
                desc = "chmod";
                break;

            case DAX_JOB:
                desc = "dax";
                break;

            case DAG_JOB:
                desc = "dag";
                break;

            default:
                desc = "unknown";
                break;

            }

        return desc;
    }




    /**
     * Returns the namespace of the underlying transformation.
     *
     * @return namespace
     */
    public String getTXNamespace(){
        return namespace;
    }


    /**
     * Sets the transformation namespace to be associated with the job.
     *
     * @param ns  the namespace.
     */
    public void setTXNamespace( String ns ){
        this.namespace = ns;
    }


    /**
     * Returns the name of the underlying transformation.
     *
     * @return name
     */
    public String getTXName(){
        return logicalName;
    }

    /**
     * Sets the transformation name of the underlying transformation.
     *
     * @param name  the logical name of the transformation.
     */
    public void setTXName(String name){
        this.logicalName = name;
    }

    /**
     * Returns the version of the underlying transformation.
     *
     * @return version
     */
    public String getTXVersion(){
        return version;
    }

    /**
     * Sets the version of the underlying transformation.
     *
     * @param vs  the version.
     */
    public void setTXVersion(String vs){
        this.version = vs;
    }

    /**
     * Sets the various attributes of underlying transformation.
     *
     * @param ns   the namespace of the transformation.
     * @param name the logical name of the transformation.
     * @param vs   the version of the transformation.
     */
    public void setTransformation( String ns, String name, String vs){
        this.setTXNamespace(ns);
        this.setTXName(name);
        this.setTXVersion(vs);
    }

    /**
     * Constructs the fully qualified name of a transformation with
     * which to query the TC, including the namespace and version.
     *
     * @return the complete tranformation name.
     */
    public String getCompleteTCName(){
        return Separator.combine(namespace,logicalName,version);
    }

    /**
     * Returns the namespace of the underlying derivation.
     *
     * @return namespace
     */
    public String getDVNamespace(){
        return dvNamespace;
    }

    /**
     * Sets the derivation namespace to be associated with the job.
     *
     * @param ns  the namespace.
     */
    public void setDVNamespace( String ns ){
        this.dvNamespace = ns;
    }

    /**
     * Returns the name of the underlying derivation.
     *
     * @return name
     */
    public String getDVName(){
        return dvName;
    }

    /**
     * Sets the derivation name of the underlying derivation.
     *
     * @param name  the logical name of the derivation.
     */
    public void setDVName(String name){
        this.dvName = name;
    }

    /**
     * Returns the version of the underlying derivation.
     *
     * @return version
     */
    public String getDVVersion(){
        return dvVersion;
    }

    /**
     * Sets the version of the underlying derivation.
     *
     * @param vs  the version.
     */
    public void setDVVersion(String vs){
        this.dvVersion = vs;
    }

    /**
     * Sets the various attributes of underlying derivation.
     *
     * @param ns   the namespace of the derivation.
     * @param name the logical name of the derivation.
     * @param vs   the version of the derivation.
     */
    public void setDerivation( String ns, String name, String vs){
        this.setDVNamespace(ns);
        this.setDVName(name);
        this.setDVVersion(vs);
    }


    /**
     * Returns the level associated with the job.
     *
     * @return  int designating the level
     */
    public int getLevel( ){
        return level;
    }

    /**
     * Sets the level for the job.
     *
     * @param value the level
     */
    public void setLevel( int value ){
        level = value;
    }

    /**
     * Constructs the fully qualified name of the corresponding derivation used
     * to generate this job in Chimera including the namespace and version.
     *
     * @return the complete derivation name.
     */
    public String getCompleteDVName(){
        return (dvName == null) ?
               null:
               Separator.combine(dvNamespace,dvName,dvVersion);
    }



    /**
     * Returns the basename for the staged executable corresponding to the
     * job.
     *
     * @return the staged executable basename
     */
    public  String getStagedExecutableBaseName(){
        return getStagedExecutableBaseName( namespace, logicalName, version );
    }


    /**
     * Returns the basename for the staged executable corresponding to the
     * job.
     *
     * @param txNamespace is the namespace in which the TR resides, may be null.
     * @param txName      is the base name of the transformation, must not be null.
     * @param txVersion   is the version of the TR, may be null
     *
     * @return the staged executable basename
     */
    public static String getStagedExecutableBaseName( String txNamespace, String txName, String txVersion ){
        return combine( txNamespace, txName, txVersion);
    }


    /**
     * Returns the submit directory path relative to the workflow submit
     * directory.
     *
     * @return  the directory name, if set else null.
     */
//    public String getSubmitDirectory(){
//        return this.submitDirectory;
//    }

    /**
     * Sets the submit directory path relative to the workflow submit
     * directory.
     *
     * @param directory  the directory name.
     */
//    public void setSubmitDirectory(String directory){
//        this.submitDirectory = directory;
//    }

    /**
     * Returns the argument string with which the job has to be invoked.
     *
     * @return the argument string.
     */
    public String getArguments(){
        return this.strargs;
    }

    /**
     * Sets the argument string with which the job has to be invoked.
     *
     * @param arguments the argument string.
     */
    public void setArguments(String arguments){
        this.strargs = arguments;
    }


    /**
     * Combines the three components together into a single string as
     * namespace-name-version.
     *
     * @param namespace is the namespace in which the TR resides, may be null.
     * @param name      is the base name of the transformation, must not be null.
     * @param version   is the version of the TR, may be null.
     *
     * @return the concatenated form .
     */
    private static String combine(String namespace,
                                  String name,
                                  String version) {
        StringBuffer result = new StringBuffer(32);
        if ( namespace != null && namespace.length() > 0 ) {
            result.append( namespace ).append( DELIMITER);
        }
        result.append(name);
        if (version != null && version.length() > 0 ) {
            result.append( DELIMITER ).append( version );
        }
        return result.toString();
    }

    /**
     * It sets the prescript for the job. The argument string is assumed to be
     * empty.
     *
     * @param path  the path to the script that has to be run as a prescript.
     */
    public void setPreScript(String path) {
        setPreScript( path , "");
    }


    /**
     * It sets the prescript for the job.
     *
     * @param path      the path to the script that has to be run as a prescript.
     * @param arguments the arguments to the prescript,
     */
    public void setPreScript(String path, String arguments){
        //this.preScript = script;
        //construct directly as we know keys are valid.
        dagmanVariables.construct( Dagman.PRE_SCRIPT_KEY, path );
        dagmanVariables.construct( Dagman.PRE_SCRIPT_ARGUMENTS_KEY, arguments);
    }

    /**
     * Returns the path to the  prescript for the job if set.
     *
     * @return  the path to the script that has to be run as a prescript, else
     *          null if no prescript has been set.
     */
    public String getPreScriptPath(){
        Object obj = dagmanVariables.get( Dagman.PRE_SCRIPT_KEY );
        return (obj == null)? null: (String)obj;
    }

    /**
     * Returns the arguments to the  prescript for the job if set.
     *
     * @return  the argumetns to the prescript script that has to be run as a
     *          prescript, else null if no prescript has been set.
     */
    public String getPreScriptArguments(){
        Object obj = dagmanVariables.get( Dagman.PRE_SCRIPT_ARGUMENTS_KEY );
        return (obj == null)? null: (String)obj;
    }

    /**
     * Returns whether the job is recursive or not.
     *
     * @return boolean
     */
    public boolean typeRecursive(){

        return  this.vdsNS.containsKey( Pegasus.TYPE_KEY )?
                this.vdsNS.getStringValue( Pegasus.TYPE_KEY ).equals( "recursive" ):
                false;
    }

    /**
     * Sets the job to be recursive.
     */
    public void setTypeRecursive(){
        this.vdsNS.construct( Pegasus.TYPE_KEY, "recursive" );
    }
    
    /**
     * Returns whether the job type value for the job is in range or not.
     *
     * @param type the job type.
     *
     * @return true if the value is in range.
     *         false if the value is not in range.
     */
    public static boolean typeInRange(int type){
        return ( type >= Job.UNASSIGNED_JOB &&
                 type <= Job.DAG_JOB );
    }


    /**
     * Updates all the profile namespaces with the information associated in
     * the transformation catalog for this job.
     * It ends up updating already existing information, and adds supplemental
     * new information if present in the transformation catalog.
     * The method does not explicitly check whehter the data object passed refers
     * to this job or not. The calling method should ensure this.
     *
     * @param  entry  the <code>TCEntry</code> object corresponding to the
     *                entry in the Transformation Catalog for the job.
     */
    public void updateProfiles(TransformationCatalogEntry entry){
        condorVariables.checkKeyInNS(entry);
        dagmanVariables.checkKeyInNS(entry);
        globusRSL.checkKeyInNS(entry);
        envVariables.checkKeyInNS(entry);
        vdsNS.checkKeyInNS(entry);
        hints.checkKeyInNS(entry);
    }

    /**
     * Updates all the profile namespaces with the information specified by the
     * user in the properties file, that apply to this job.
     * It ends up updating already existing information, and adds supplemental
     * new information if present in the properties file.
     * The method does not explicitly check whehter the data object passed refers
     * to this job or not. The calling method should ensure this.
     *
     * @param properties  the <code>PegasusProperties</code> object containing
     *                    the user properties.
     */
    public void updateProfiles(PegasusProperties properties){
        condorVariables.checkKeyInNS(properties,executionPool);
        dagmanVariables.checkKeyInNS(properties,executionPool);
        globusRSL.checkKeyInNS(properties,executionPool);
        envVariables.checkKeyInNS(properties,executionPool);
        vdsNS.checkKeyInNS(properties,executionPool);
        hints.checkKeyInNS(properties, executionPool );
    }
    
    /**
     * Updates all the profile namespaces with the information specified in
     * list of profile objects passed. Pool catalog returns profile information
     * as a list of <code>Profile</code> objects that need to be propogated to
     * the job.
     * It ends up updating already existing information, and adds supplemental
     * new information if present in the properties file.
     *
     *
     * @param profiles  The <code>Profiles</code> that need to be incorporated in
     *                  the jobs profile namespaces.
     */
    public void updateProfiles( Profiles profiles){
        if( profiles == null ){
            //nothing to put in the namespaces
            return;
        }

        String key = null;
        Namespace n = profiles.get( NAMESPACES.condor );
        for( Iterator it = n.getProfileKeyIterator(); it.hasNext(); ){
            key = (String)it.next();
            condorVariables.checkKeyInNS( key, (String)n.get( key ) );
        }

        n = profiles.get( NAMESPACES.globus );
        for( Iterator it = n.getProfileKeyIterator(); it.hasNext(); ){
            key = (String)it.next();
            globusRSL.checkKeyInNS( key, (String)n.get( key ) );
        }
        
        n = profiles.get( NAMESPACES.env );
        for( Iterator it = n.getProfileKeyIterator(); it.hasNext(); ){
            key = (String)it.next();
            envVariables.checkKeyInNS( key, (String)n.get( key ) );
        }
        
        n = profiles.get( NAMESPACES.pegasus );
        for( Iterator it = n.getProfileKeyIterator(); it.hasNext(); ){
            key = (String)it.next();
            vdsNS.checkKeyInNS( key, (String)n.get( key ) );
        }
        
        
        n = profiles.get( NAMESPACES.dagman );
        for( Iterator it = n.getProfileKeyIterator(); it.hasNext(); ){
            key = (String)it.next();
            dagmanVariables.checkKeyInNS( key, (String)n.get( key ) );
        }

        n = profiles.get( NAMESPACES.hints );
        for( Iterator it = n.getProfileKeyIterator(); it.hasNext(); ){
            key = (String)it.next();
            hints.checkKeyInNS( key, (String)n.get( key ) );
        }
    }


    /**
     * Updates all the profile namespaces with the information specified in
     * list of profile objects passed. Pool catalog returns profile information
     * as a list of <code>Profile</code> objects that need to be propogated to
     * the job.
     * It ends up updating already existing information, and adds supplemental
     * new information if present in the properties file.
     *
     *
     * @param profiles  the list of <code>Profile</code> objects that need to be
     *                  incorporated in the jobs profile namespaces.
     */
    public void updateProfiles(List profiles){
        if(profiles == null || profiles.isEmpty()){
            //nothing to put in the namespaces
            return;
        }

        Profile profile = null;
        for( Iterator it = profiles.iterator(); it.hasNext(); ){
            profile = (Profile)it.next();

            if(profile.getProfileNamespace().equals(Profile.CONDOR))
                condorVariables.checkKeyInNS(profile);

            else if(profile.getProfileNamespace().equals(Profile.GLOBUS))
                globusRSL.checkKeyInNS(profile);

            else if(profile.getProfileNamespace().equals(Profile.ENV))
                envVariables.checkKeyInNS(profile);

            else if(profile.getProfileNamespace().equals(Profile.VDS))
                vdsNS.checkKeyInNS(profile);

            else if(profile.getProfileNamespace().equals(Profile.DAGMAN))
                dagmanVariables.checkKeyInNS(profile);

            else if(profile.getProfileNamespace().equals(Profile.HINTS))
                hints.checkKeyInNS(profile);

            else{
                //unknown profile.
                mLogger.log("Unknown Profile: " + profile + " for job" +
                            this.jobName,LogManager.WARNING_MESSAGE_LEVEL);
            }

        }

    }

    /**
     * Merges profiles from another job to this job in a controlled fashion.
     * The merging of the profile is dependant upon the namespace to which it
     * belongs. Some profiles maybe overriden, others maybe summed up etc.
     *
     * @param job  the <code>Job</code> object containing the job description
     *             for the job whose profiles have to be merged into this job.
     *
     */
    public void mergeProfiles( Job job ){
        this.globusRSL.merge( job.globusRSL );
        this.envVariables.merge( job.envVariables );
        this.condorVariables.merge( job.condorVariables );
        this.dagmanVariables.merge( job.dagmanVariables );
        this.vdsNS.merge( job.vdsNS );
        this.hints.merge( job.hints );
    }

    /**
     * Checks if an object is similar to the one referred to by this class.
     * We compare the primary key to determine if it is the same or not.
     *
     * @param obj  the object for which equalsto is applied.
     *
     * @return true if the primary key (jobName) match.
     *         else false.
     */
    public boolean equals(Object obj){
        if(obj instanceof Job){
            Job job = (Job) obj;

            return job.jobName.equals(this.jobName) ?
                true :
                false;
        }
        //objects are of different type. cannot be compared
        return false;
    }

    /**
     * Returns a boolean value denoting whether the job is MPI or not.
     * If no job type is specified in the globus rsl for the job, the job is
     * assumed to be non mpi.
     *
     * @return boolean  true  if jobtype=mpi set in the globus rsl.
     *                  false in all other cases.
     */
    public boolean isMPIJob(){
        boolean mpi = false;
        //sanity checks
        if(this.globusRSL == null || !(globusRSL.containsKey("jobtype")) ){
            return false;
        }

        return( ((String)globusRSL.get("jobtype")).equalsIgnoreCase("mpi") )?
                true : //the job type is set to mpi
                false;

    }


    /**
     * Returns whether a job should be run in the work directory or not.
     * If a job is not run in the work directory, then it should be run
     * in the submit directory. That would be the case if the job has been
     * scheduled to site "local" and the class of the job coressponds to the
     * auxillary jobs that have been created by Pegasus.
     *
     * @return boolean true to indicate job can run in work directory,
     *                 false job cant be run.
     */
    public boolean runInWorkDirectory(){
        return !(executionPool != null &&
                executionPool.equalsIgnoreCase("local") &&
                (jobClass > this.COMPUTE_JOB &&
                 jobClass <= this.CREATE_DIR_JOB));
    }

    /**
     * Resets all the profiles associated with the job.
     */
    public void resetProfiles(){
        envVariables     = new ENV();
        globusRSL        = new Globus();
        condorVariables  = new Condor();
        dagmanVariables  = new Dagman();
        hints            = new Hints();
        vdsNS            = new Pegasus();
    }

    /**
     * Returns a textual description of the object.
     *
     * @return textual description of the job.
     */
    public String toString(){
        String str = this.globusRSL == null ? null : this.globusRSL.toString();
        String cVar = this.condorVariables == null ? null : this.condorVariables.toString();
        String envStr = this.envVariables == null ? null : this.envVariables.toString();

        StringBuffer sb = new StringBuffer();
        String newline = System.getProperty( "line.separator", "\r\n" );

        sb.append("[");
        append( sb, "Job Name", this.jobName , newline );
        append( sb, "Logical Id", this.logicalId , newline );
        append( sb, "Transformation", this.getCompleteTCName() , newline );
        append( sb, "Derivation", this.getCompleteDVName() , newline );
        append( sb, "Level", new Integer(this.level).toString() , newline );
        append( sb, "Job Type Description", getJobTypeDescription(this.jobClass) , newline );
        append( sb, "Job Id" , this.jobID , newline );
        append( sb, "Runtime", this.mRuntime, newline  );
        append( sb, "Executable" , this.executable , newline );
        append( sb, "Directory", this.mDirectory, newline );
        append( sb, "Condor Universe" , this.condorUniverse , newline );
        append( sb, "Globus Scheduler" , this.globusScheduler , newline );
        append( sb, "Standard Output" ,  this.stdOut , newline );
        append( sb, "Standard Input" , this.stdIn , newline );
        append( sb, "Standard Error" , this.stdErr , newline );
        append( sb, "Argument String" , this.strargs , newline );
        append( sb, "Execution Site", this.executionPool , newline );
        append( sb, "Staging Site", this.mStagingSite , newline );
        append( sb, "Globus RSL" ,  str , newline );
        append( sb, "Environment Variables" , envStr , newline );
        append( sb, "Dagman Variables" , this.dagmanVariables , newline );
        append( sb, "Hints" , this.hints , newline );
        append( sb, "Input Files " , this.inputFiles , newline );
        append( sb, "Output Files ", this.outputFiles , newline );
        append( sb, "Condor Variables\n" , cVar , newline );
        append( sb, "VDS Profiles" , vdsNS , newline );
        append( sb, "Notifications", this.mNotifications, newline );
        append( sb, "Credentials", this.mCredentialsType, newline );
        sb.append("]");
        return sb.toString();

    }

    /**
     * Returns the DOT description of the object. This is used for visualizing
     * the workflow.
     *
     * @return String containing the Partition object in XML.
     *
     * @exception IOException if something fishy happens to the stream.
     */
    public String toDOT() throws IOException{
        Writer writer = new StringWriter(32);
        toDOT( writer, "" );
        return writer.toString();
    }

    /**
     * Returns the DOT description of the object. This is used for visualizing
     * the workflow.
     *
     * @param stream is a stream opened and ready for writing. This can also
     *               be a StringWriter for efficient output.
     * @param indent  is a <code>String</code> of spaces used for pretty
     *                printing. The initial amount of spaces should be an empty
     *                string. The parameter is used internally for the recursive
     *                traversal.
     *
     *
     * @exception IOException if something fishy happens to the stream.
     */
    public void toDOT( Writer stream, String indent ) throws IOException {
        String newLine = System.getProperty( "line.separator", "\r\n" );

        //write out the node
        stream.write( indent );
        stream.write( "\"" );
        stream.write( getID() );
        stream.write( "\"" );
        stream.write( " " );
        stream.write( "[" );

        //write out the color for the node
        stream.write( "color=" );
        stream.write( getDOTColor() );
        stream.write( "," );

        //write out the style
        stream.write( "style=filled," );

        //write out the label
        stream.write( "label=" );
        stream.write( "\"" );
        stream.write( getID() );
        stream.write( "\"" );

        stream.write( "]" );

        stream.write( newLine );
        stream.flush();
    }


    /**
     * Returns the color with which DOT should color the node representing the
     * job.
     *
     * @return the color.
     */
    protected String getDOTColor(){
        int type = this.getJobType();

        String color;
        switch ( type ){

            case 1: //this.COMPUTE_JOB:
                color = "blueviolet";
                break;

            case 2: //this.STAGE_IN_JOB:
                color = "gold";
                break;

            case 3: //this.STAGE_OUT_JOB:
                color = "goldenrod";
                break;

            case 5: //this.INTER_POOL_JOB:
                color = "goldenrod4";
                break;

            case 4: //this.REPLICA_REG_JOB:
                color = "orange";
                break;

            case 6: //this.CREATE_DIR_JOB:
                color = "darkturquoise";
                break;

            case 7: //this.STAGED_COMPUTE_JOB:
                color = "violet";
                break;

            case 8: //this.CLEANUP_JOB:
                color = "deepskyblue";
                break;

            default:
                color = "grey";
        }

        return color;
    }


    /**
     * Appends a key value mapping to the StringBuffer.
     *
     * @param sb      StringBuffer to which the mapping has to be appended.
     * @param key     the field.
     * @param value   the value of the field.
     * @param newLine the newLineSeparator to be used.
     */
    private void append(StringBuffer sb, String key, Object value, String newLine){
        String openingBrace = "{";
        String closingBrace = "}";
        String pointsTo     = " -> ";
        String separator    = ",";
        sb.append(newLine).append(openingBrace).append(key).append(pointsTo).
           append(value).append(closingBrace).append(separator);
    }

    /**
     * Adds a profile to the job object
     *
     * @param p  the profile to be added
     */
    public void addProfile( Profile p ) {
        String namespace = p.getProfileNamespace();
        String key = p.getProfileKey();
        String value = p.getProfileValue();
        switch( namespace.charAt(0) ){

                case 'c'://condor
                    this.condorVariables.checkKeyInNS( key, value );
                    break;

                case 'd'://dagman
                    this.dagmanVariables.checkKeyInNS(key, value );
                    break;

                case 'e'://env
                    this.envVariables.checkKeyInNS(key, value );
                    break;

                case 'g'://globus
                    this.globusRSL.checkKeyInNS(key, value );
                    break;

                case 'h'://hint
                    this.hints.checkKeyInNS(key, value );
                    break;

                case 'p'://pegasus
                    this.vdsNS.checkKeyInNS(key, value );
                    break;

                default:
                    //ignore should not come here ever.
                    mLogger.log("Namespace not supported. ignoring "+ namespace,
                            LogManager.WARNING_MESSAGE_LEVEL);
                    break;

            }
    }


}
